﻿#include"XQTaskPool.h"
#include"XQRunnable.h"
#include"XQFuncEventFilter.hpp"
#include"XQFuncEvent.h"
#include<QCoreApplication>
#include<QThreadPool>
#include<QEventLoop>
#include<QDebug>
XQTaskPool::XQTaskPool(QObject* parent)
	:QThreadPool(parent)
{
	init();
}

XQTaskPool::~XQTaskPool()
{
	requestInterruption();
	wait_event();
}

size_t XQTaskPool::size()
{
	QReadLocker lock(&m_taskLock);
	return m_taskList.size();
}

bool XQTaskPool::isEmpty() 
{
	QReadLocker lock(&m_taskLock);
	return m_taskList.empty();
}

bool XQTaskPool::isExists(XQRunnable* task)
{
	QReadLocker lock(&m_taskLock);
	return m_taskList.find(task)!= m_taskList.end();
}

QString XQTaskPool::name() const
{
	return m_name;
}
void XQTaskPool::setName(const QString& name)
{
	m_name=name;
}

XQRunnable* XQTaskPool::addTask(XQRunnable* task, int priority)
{
	if(task==nullptr)
		return task;
	task->setTaskPool(this);
	task->setAutoDelete(false);
	QWriteLocker lock(&m_taskLock);
	m_taskList.insert( task);
	if (m_taskList.size() == 1)
	{
		new XQFuncEvent(this, [=] {emit start(); emit start(name()); emit start(this); });
	}
	QThreadPool::start(task, priority);
	return task;
}

XQRunnable* XQTaskPool::addTask(std::function<void()>&& task, int priority)
{
	return addTask(XQRunnable::create(task), priority);
}
XQRunnable* XQTaskPool::addTask(std::function<void(XQRunnable*)>&& task, int priority)
{
	return addTask(XQRunnable::create(task), priority);
}
void XQTaskPool::freeTask(XQRunnable* task)
{
	if (task)
		task->requestInterruption();
	QWriteLocker lock(&m_taskLock);
	if (m_taskList.find(task) == m_taskList.end())
		return;
	m_taskList.erase(task);
	lock.unlock();
	task->wait();//等待执行完毕
	delete task;//释放它
	m_waitOne.notify_all();
	if (isEmpty())
		m_waitAll.notify_all();
	new XQFuncEvent(this, [=] {//异步的方式发送结束一个任务信号
		emit taskFinish(task);
		if(isEmpty())//如果当前任务量为零则发送任务全部完成信号
			emit finish();
			emit finish(name());
			emit finish(this);
		});
}

void XQTaskPool::init()
{
	installEventFilter(new XQFuncEventFilter(this));
	//setMaxThreadCount(QThread::idealThreadCount() * 0.8);
}

void XQTaskPool::runTask(XQRunnable* task)
{
	//QWriteLocker lock(&m_lock);
	new XQFuncEvent(this, [=] {//异步的方式发送一个任务开始信号
		emit taskStart(task);
		});
}

void XQTaskPool::requestInterruption()
{
	QReadLocker lock(&m_taskLock);
	for (auto& run : m_taskList)
	{
		run->requestInterruption();
	}
	emit requestQuit();
}

void XQTaskPool::wait()
{
	QReadLocker lock(&m_taskLock);
	if(!m_taskList.empty())
		m_waitAll.wait(&m_taskLock);
}

void XQTaskPool::wait(XQRunnable* task)
{
	while (true)
	{
		QReadLocker lock(&m_taskLock);
		if (m_taskList.find(task) != m_taskList.end())
			m_waitOne.wait(&m_taskLock);
		else
			break;
	}
}
void XQTaskPool::wait(int count)
{
	while (true)
	{
		QReadLocker lock(&m_taskLock);
		if (m_taskList.size()>count)
			m_waitOne.wait(&m_taskLock);
		else
			break;
	}
}

void XQTaskPool::wait_event()
{
	if (isEmpty())
		return;
	QReadLocker lock(&m_taskLock);
	QEventLoop loop;
	connect(this, QOverload<>::of(&XQTaskPool::finish), &loop, &QEventLoop::quit);
	if(!isEmpty())
	{
		lock.unlock();
		loop.exec();
		//进入事件循环等待
	}
}

void XQTaskPool::wait_event(XQRunnable* task)
{
	if (isEmpty()||task==nullptr)
		return;
	QReadLocker lock(&m_taskLock);
	QEventLoop loop,*pLoop=&loop;
	auto conn = connect(this, QOverload<XQRunnable*>::of(&XQTaskPool::taskFinish), [=](XQRunnable* run){
			if (isEmpty()||run==task)
				pLoop->quit(); 
		});
	if (m_taskList.find(task)!=m_taskList.end())
	{
		lock.unlock();
		loop.exec();
	}
	disconnect(conn);
}

void XQTaskPool::wait_event(const QList<XQRunnable*>& taskList)
{
	if (isEmpty())
		return;
	QReadLocker lock(&m_taskLock);
	QEventLoop loop, * pLoop = &loop;
	QSet<XQRunnable*> list,*pList=&list;
	auto conn = connect(this, QOverload<XQRunnable*>::of(&XQTaskPool::taskFinish), [=](XQRunnable* run)
		{
			if (pList->contains(run))
				pList->remove(run);
			if (isEmpty() || pList->isEmpty())
				pLoop->quit();
		});
	for (auto&task: taskList)
	{
		if (m_taskList.find(task)!=m_taskList.end())
			list << task;
	}
	if (!list.isEmpty())
	{
		lock.unlock();
		loop.exec();
	}
	disconnect(conn);
}

void XQTaskPool::wait_event(int count)
{
	if (isEmpty())
		return;
	QReadLocker lock(&m_taskLock);
	QEventLoop loop;
	auto conn = connect(this, QOverload<XQRunnable*>::of(&XQTaskPool::taskFinish), [&](XQRunnable* run) {if (isEmpty() || (size() == count))loop.quit(); });
	if (size()>count)
	{
		lock.unlock();
		loop.exec();
	}
	disconnect(conn);
}

void XQTaskPool::waitNot(XQRunnable* task)
{
	if (isEmpty())
		return;
	QReadLocker lock(&m_taskLock);
	QEventLoop loop;
	auto conn = connect(this, QOverload<XQRunnable*>::of(&XQTaskPool::taskFinish), [&](XQRunnable* run) {if (isEmpty() || (size() == 1 && *m_taskList.begin() == task))loop.quit(); });
	if (m_taskList.find(task)!=m_taskList.end())
	{
		lock.unlock();
		loop.exec();
	}
	disconnect(conn);
}
